一、概述
Flume是的一个分布式、高可用、高可靠的海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据,同时提供了对数据进行简单处理并写到各种数据接收方的能力。
Flume的设计原理是基于数据流的,能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。 Flume能够做到近似实时的推送,并且可以满足数据量是持续且量级很大的情况。比如它可以收集社交网站日志,并将这些数量庞大的日志数据从网站服务器上汇集起来,存储到HDFS或 HBase分布式数据库中。
Flume的应用场景:比如一个电商网站,想从网站访问者中访问一些特定的节点区域来分析消费者的购物意图和行为。为了实现这一点,需要收集到消费者访问的页面以及点击的产品等日志信息,并移交到大数据 Hadoop平台上去分析,可以利用 Flume做到这一点。现在流行的内容推送,比如广告定点投放以及新闻私人定制也是基于这个道理。
1.1 相关概念
Event
事件是Flume内部数据传输的最基本单元,将传输的数据进行封装。事件本身是由一个载有数据的字节数组和可选的headers头部信息构成,如下图所示。Flume以事件的形式将数据从源头传输到最终的目的地。
Agent
Flume Agent 是一个JVM进程,通过三个组件(source、channel、sink)将事件流从一个外部数据源收集并发送给下一个目的地。
Source
从数据发生器接收数据,并将数据以Flume的Event格式传递给一个或多个通道(Channel)
支持Source:
- Avro Source
- Thrift Source
- Exec Source
- JMS Source
- Spooling Directory Source
- Taildir Source
- Twitter 1% firehose Source (experimental)
- Kafka Source
- NetCat TCP Source
- NetCat UDP Source
- Sequence Generator Source
- Syslog Sources
- HTTP Source
- Stress Source
- Legacy Sources
- Custom Source
Channel
一种短暂的存储容器,位于 Source和Sink之间,起着桥梁的作用。 Channel将从Source处接收到的 Event格式的数据缓存起来,当Sink成功地将 Events发送到下一跳的Channel或最终目的地后, Events从 Channel移除。Channel是一个完整的事务,这一点保证了数据在收发的时候的一致性。可以把 Channel看成一个FIFO(先进先出)队列,当数据的获取速率超过流出速率时,将Event保存到队列中,再从队中一个个出来。
有以下几种Channel:
- Memory Channel 事件存储在可配置容量的内存队列中,队列容量即为可存储最大事件数量,适用于高吞吐量场景,在agent出现错误时有可能会丢失部分数据
- File Channel 基于文件系统的持久化存储
- Spillable Memory Channel 内存和文件混合Channel,当内存队列满了之后,新的事件会存储在文件系统,目前处于实验阶段,不建议在生产环境中使用
- JDBC Channe 事件存储在持久化的数据库中,目前只支持Derby
- Kafka Channel 事件存储在Kafka集群中
- Pseudo Transaction Channel 伪事务Channel,仅用于测试,不能在生产环境使用
- Custom Channel 自定义Channel
Sink
获取Channel暂时保存的数据并进行处理。sink从channel中移除事件,并将其发送到下一个agent(简称下一跳)或者事件的最终目的地,比如HDFS。
Sink分类:
- HDFS Sink
- Hive Sink
- Logger Sink
- Avro Sink
- Thrift Sink
- IRC Sink
- File Roll Sink 将Events保存在本地文件系统
- Null Sink 抛弃从Channel接收的所有事件
- HBaseSinks
- MorphlineSolrSink
- ElasticSearchSink
- Kite Dataset Sink
- Kafka Sink
- HTTP Sink
- Custom Sink
1.2 流程
(1)外部数据源(Web Server)将Flume可识别的 Event发送到 Source (2) Source收到 Event事件后存储到一个或多个Channel通道中。 (3)Channel保留 Event直到Sink将其处理完毕。 (4)Sink从 Channel中取出数据,并将其传输至外部存储(HDFS)。
1.3 可靠性
事件在每个agent的channel中短暂存储,然后事件被发送到下一个agent或者最终目的地。事件只有在存储在下一个channel或者最终存储后才从当前的channel中删除。
Flume使用事务的办法来保证Events的可靠传递。Source和Sink分别被封装在事务中,事务由保存Event的存储或者Channel提供。这就保证了Event在数据流的点对点传输中是可靠的。在多跳的数据流中,上一跳的sink和下一跳的source均运行事务来保证数据被安全地存储到下一跳的channel中。
二、SDK使用
2.1使用Demo
自定义Source
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation, convert to another type, ...)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external client
}
@Override
public void stop () {
// Disconnect from external client and do any additional cleanup
// (e.g. releasing resources or nulling-out field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
try {
// This try clause includes whatever Channel/Event operations you want to do
// Receive new data
Event e = getSomeData();
// Store the Event into this Source's associated Channel(s)
getChannelProcessor().processEvent(e);
status = Status.READY;
} catch (Throwable t) {
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}添加MySource
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
public class MySource extends AbstractSource implements Configurable, PollableSource {
// 处理数据
public Status process() throws EventDeliveryException {
Status status = null;
try {
// 接收新数据
for (int i = 0; i < 10; i++) {
Event e = new SimpleEvent();
e.setBody(("data:"+i).getBytes());
// 将数据存储到与Source关联的Channel中
getChannelProcessor().processEvent(e);
status = Status.READY;
}
Thread.sleep(5000);
} catch (Throwable t) {
// 打印日志
status = Status.BACKOFF;
// 抛出异常
if (t instanceof Error) {
throw (Error)t;
}
} finally {
}
return status;
}
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
public void configure(Context context) {
}
}添加mySourceAgent.conf
# 定义agent名称为a1
<NolebasePageProperties />
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source类型为mysource
a1.sources.r1.type = com.itheima.flume.source.MySource
# 配置sink类型为Logger
a1.sinks.k1.type = logger
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1启动Flume
flume-ng agent -n a1 -c conf -f mySourceAgent.conf自定义Sink
public class MySink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
}
return status;
}
}添加MySink,可以参考LoggerSink
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory
.getLogger(MySink.class);
public Status process() throws EventDeliveryException {
Status status = null;
// 开启事务
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
try {
txn.begin();
// 从channel中获取数据
Event event = ch.take();
if(event==null){
status = Status.BACKOFF;
}
// 将事件发送到外部存储
// storeSomeData(e);
// 打印事件
logger.info(new String(event.getBody()));
// 提交事务
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// 打印异常日志
status = Status.BACKOFF;
// 抛出异常
if (t instanceof Error) {
throw (Error)t;
}
}finally {
txn.close();
}
return status;
}
public void configure(Context context) {
}
}修改mySourceAgent.conf
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source类型为mysource
a1.sources.r1.type = com.itheima.flume.source.MySource
# 配置sink类型为MySink
a1.sinks.k1.type = com.itheima.flume.sink.MySink
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1三、测试Flume
3.1 Netcat测试Flume
使用Flume监听某个端口,使用Netcat向这个端口发送数据,Flume将接收到的数据打印到控制台。
Netcat是一款TCP/UDP测试工具,可以通过以下命令安装
yum install -y nc必须属性
属性名 默认值 说明 channels – type – netcatbind – 绑定的主机名或者IP地址 port – 绑定端口 必须属性
属性名 默认值 说明 type – memory必须属性
属性名 默认值 说明 channel – type – logger
添加配置文件
在apache-flume-1.9.0-bin/conf目录下添加配置文件example.conf
# example.conf: 单节点Flume配置
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink类型为Logger
a1.sinks.k1.type = logger
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1启动flume
查看Flume使用命令
flume-ng help启动agent
flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console或者
flume-ng agent -n a1 -c conf -f example.conf -Dflume.root.logger=INFO,console# 将绑定端口配置为IP地址,绑定为localhost或者127.0.0.1在另外一台机器上无法连接
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.85.132
a1.sources.r1.port = 44444如果开启了防火墙,需要添加防火墙规则
firewall-cmd --zone=public --add-port=44444/tcp --permanent
firewall-cmd --reload3.2 使用telnet测试
# 查看有无安装
yum list | grep telnet
# 安装
yum install -y telnet.x86_64
yum install -y telnet-server.x86_64
# 启动测试
telnet 127.0.0.1 44444四、重要组件
4.1 数据持久化 File Channel
使用组件
属性设置
| 属性名 | 默认值 | 说明 |
|---|---|---|
| type | – | file |
| checkpointDir | ~/.flume/file-channel/checkpoint | 检查点文件存放路径 |
| dataDirs | ~/.flume/file-channel/data | 日志存储路径,多个路径使用逗号分隔. 使用不同的磁盘上的多个路径能提高file channel的性能 |
配置
添加配置文件file-channel.conf,添加一个FileChannel
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
# 多个channel使用空格分隔
a1.channels = c1 c2
# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink类型为Logger
a1.sinks.k1.type = logger
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置FileChannel,checkpointDir为检查点文件存储目录,dataDirs为日志数据存储目录,
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/soft/bak/flume/checkpoint
a1.channels.c2.dataDirs = /opt/soft/bak/flume/data
# 将source和sink绑定到channel上
# source同时绑定到c1和c2上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1为了方便日志打印,可以将-Dflume.root.logger=INFO,console添加在conf的环境配置中,从模板复制一份配置
cp flume-env.sh.template flume-env.sh
vi flume-env.sh
# 添加JAVA_OPTS
export JAVA_OPTS="-Dflume.root.logger=INFO,console"启动验证
启动Flume
flume-ng agent -n a1 -c ./ -f file-channnel.conf通过Netcat发送数据,,此时发送到c2的数据没有被消费,关闭Flume,修改配置文件
# 将sink绑定到c2上
a1.sinks.k1.channel = c2重启Flume,可以看到会重新消费c2的数据
4.2 日志文件监控 Exec Soucre
企业中应用程序部署后会将日志写入到文件中,可以使用Flume从各个日志文件将日志收集到日志中心以便于查找和分析。
Exec Soucre
Exec Source通过指定命令监控文件的变化,加粗属性为必须设置的。
| 属性名 | 默认值 | 说明 |
|---|---|---|
| channels | – | |
| type | – | exec |
| command | – | 要执行的命令 |
| restart | false | 如果执行命令挂了是否要重启 |
| batchSize | 20 | 同时往channel发送的最大行数 |
| batchTimeout | 3000 | 批量发送超时时间 |
| selector.type | replicating | channel选择器replicating 或者 multiplexing |
| selector.* | 通道选择器匹配属性 | |
| interceptors | – | 拦截器 |
| interceptors.* |
配置
添加配置文件exec-log.conf
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source类型为exec,命令为 tail -F app.log
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F app.log
# 配置sink类型为Logger
a1.sinks.k1.type = logger
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1启动验证
启动Flume
flume-ng agent -n a1 -c conf -f file-log.conf -Dflume.root.logger=INFO,console通过命令更新app.log文件
echo "abcdef">> app.log可以查看agent控制台接收到了最新的日志
4.3 解决重复消费 Taildir Source
| 属性名 | 默认值 | 说明 |
|---|---|---|
| channels | – | |
| type | – | TAILDIR. |
| filegroups | – | 可以定义多个组. 每个组里包含一序列被监控的文件 |
filegroups.<filegroupName> | – | 被监控文件的绝对路径,文件名支持正则表达式 |
| positionFile | ~/.flume/taildir_position.json | 记录监控文件的绝对路径、上次读取位置的json文件 |
配置
新增dir-log.conf
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source类型为TAILDIR
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/soft/flume/position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/soft/apache-flume-1.9.0-bin/conf/app.log
a1.sources.r1.filegroups.f2 = /opt/soft/apache-flume-1.9.0-bin/conf/applogs/.*log
# 配置sink类型为Logger
a1.sinks.k1.type = logger
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c14.4 多个agent模型
将多个Flume agent 程序连接在一起,其中一个agent的sink将数据发送到另一个agent的source。Avro文件格式是使用Flume通过网络发送数据的标准方法。从多个Web服务器收集日志,发送到一个或多个集中处理的agent,之后再发往日志存储中心.同样的日志发送到不同的目的地。
第一个agent从Netcat接收数据,增加一个channel和sink,将这个sink发送到第二个agent
第二个agent在监控文件变化的同时监控从sink发送来的事件,最终输出到控制台
Avro Sink属性
使用Avro Sink,必须设置以下属性
| 属性名 | 默认值 | Description |
|---|---|---|
| channel | – | |
| type | – | avro |
| hostname | – | 绑定的主机名或者IP地址 |
| port | – | 监听端口 |
Avro Source 属性
使用Avro Source,必须设置以下属性
| 属性名 | 默认值 | 说明 |
|---|---|---|
| channels | – | |
| type | – | avro |
| bind | – | 绑定的主机名或者IP地址 |
| port | – | 监听端口 |
配置
添加agent1配置文件
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink1类型为Logger
a1.sinks.k1.type = logger
# 配置sink2类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.85.132
a1.sinks.k2.port = 55555
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# 将source和sink绑定到channel上
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2添加agent2配置文件
# 定义agent名称为a2
# 设置3个组件的名称
a2.sources = r1 r2
a2.sinks = k1
a2.channels = c1
# 配置source类型为exec,命令为 tail -F app.log
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F app.log
# 配置source类型为avro
a2.sources.r2.type = avro
a2.sources.r2.bind = 192.168.85.132
a2.sources.r2.port = 55555
# 配置sink类型为Logger
a2.sinks.k1.type = logger
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a2.sources.r1.channels = c1
a2.sources.r2.channels = c1
a2.sinks.k1.channel = c1启动验证
启动agent1和agent2
flume-ng agent -n a1 -c conf -f agent1.conf
flume-ng agent -n a2 -c conf -f agent2.conf先往app.log中写入日志,可以在agent2看到最新数据
打开Netcat连接到44444,发送数据,可以同时在agent1和agent2看到最新数据。
4.5 使用Flume导入数据到HDFS
属性
数据导出到HDFS需要使用HDFS Sink,需要配置属性如下:
| 属性名 | 默认值 | 说明 |
|---|---|---|
| channel | – | |
| type | – | hdfs |
| hdfs.path | – | HDFS 文件路径 (例如 hdfs://namenode/flume/webdata/) |
| hdfs.fileType | SequenceFile | 文件格式: SequenceFile, DataStream or CompressedStream (1)DataStream 不会压缩输出文件且不用设置 codeC (2)CompressedStream 需要设置 hdfs.codeC |
| hdfs.codeC | 压缩格式 : gzip, bzip2, lzo, lzop, snappy |
注:使用HDFS Sink需要用到Hadoop的多个包,可以在装有Hadoop的主机上运行Flume,如果是单独部署的Flume,可以通过多个Agent的形式将单独部署的Flume Agent 日志数据发送到装有Hadoop的Flume Agent上。
配置
创建hdfs.conf
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink类型为hdfs
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node01:9000/user/flume/logs
a1.sinks.k1.hdfs.fileType = DataStream
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1启动验证
启动flume
bin/flume-ng agent --conf conf/ --conf-file conf/hdfs.conf -Dfile.root.logger=debug,info,console --name hdfs注:如果出现com.google.common.base.Preconditions.checkArgument 查看下flume/lib目录下
的guava.jar版本是否与hadoop/share/hadoop/common/lib中的版本是否一致,不一致拷贝新版
在后台查看
hadoop fs -cat /user/flume/messages/flume-.1578835130630五、拦截器
拦截器可以修改或者丢弃事件,Flume支持链式调用拦截器,拦截器定义在source中
5.1 Host Interceptor
这个拦截器将运行agent的hostname 或者 IP地址写入到事件的headers中
| 属性名 | 默认值 | 说明 |
|---|---|---|
| type | – | host |
| preserveExisting | false | 如果header已经存在host, 是否要保留 - true保留原始的,false写入当前机器 |
| useIP | true | true为IP地址, false为 hostname. |
| hostHeader | host | header中key的名称 |
打开另外一台虚拟机,安装好Flume
在flume/conf目录下新建app.log文件
touch app.log配置
添加agent3.conf,这个agent监控本地的app.log,将数据发送到虚拟机132上
# 定义agent名称为a3
# 设置3个组件的名称
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# 配置source类型为exec,命令为 tail -F app.log
a3.sources.r1.type = exec
a3.sources.r1.command = tail -F app.log
# 配置拦截器为host
a3.sources.r1.interceptors = i1
a3.sources.r1.interceptors.i1.type = host
# 配置sink类型为avro
a3.sinks.k1.type = avro
a3.sinks.k1.hostname = 192.168.85.132
a3.sinks.k1.port = 55555
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1这里注意需要在132上开启防火墙端口
firewall-cmd --zone=public --add-port=55555/tcp --permanent
firewall-cmd --reload启动验证
启动agent3
flume-ng agent -n a3 -c conf -f agent3.conf可以在132的agent2控制台看到agent3已经连接成功
往135的app.log中写入数据
echo "data from 135" >> app.log可以在agent2上看到添加了headers
修改agent2.conf,添加拦截器
# 配置拦截器
a2.sources.r2.interceptors = i1
a2.sources.r2.interceptors.i1.type = host
a2.sources.r2.interceptors.i1.preserveExisting = false可以看到135发送来的事件中header被修改成了本机的
5.2 Timestamp Interceptor
这个拦截器将当前时间写入到事件的headers中
| 属性名 | 默认值 | 说明 |
|---|---|---|
| type | – | timestamp |
| headerName | timestamp | header中key的名称 |
| preserveExisting | false | If the timestamp already exists, should it be preserved - true or false |
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp5.3 Static Interceptor
运行用户对所有的事件添加固定的header
| 属性名 | 默认值 | 说明 |
|---|---|---|
| type | – | static |
| preserveExisting | true | If configured header already exists, should it be preserved - true or false |
| key | key | header 中key名称 |
| value | value | header 中value值 |
a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datacenter
a1.sources.r1.interceptors.i3.value = NEW_YORK5.4 UUID Interceptor
a1.sources.r1.interceptors = i1 i2 i3 i4
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datacenter
a1.sources.r1.interceptors.i3.value = NEW_YORK
a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder5.5 Search and Replace Interceptor
a1.sources.r1.interceptors = i1 i2 i3 i4 i5
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i2.type = timestamp
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = datacenter
a1.sources.r1.interceptors.i3.value = NEW_YORK
a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.r1.interceptors.i5.type = search_replace
a1.sources.r1.interceptors.i5.searchPattern = \\d{6}
a1.sources.r1.interceptors.i5.replaceString = ******5.6 自定义拦截器
依赖
xml<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>自定义拦截器
javaimport org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; public class MyInterceptor implements Interceptor { private static final Logger logger = LoggerFactory .getLogger(MyInterceptor.class); public void initialize() { } /** * 拦截单个事件 * @param event * @return */ public Event intercept(Event event) { String host = event.getHeaders().get("host"); logger.info("接收到host为:"+host); if (host.equals("192.168.85.135")) { logger.info("丢弃事件"); return null; } return event; } public List<Event> intercept(List<Event> list) { List<Event> newList = new ArrayList<Event>(); for (Event event : list) { event = intercept(event); if(event!=null){ newList.add(event); } } return newList; } public void close() { } public static class Builder implements Interceptor.Builder{ public Interceptor build() { return new MyInterceptor(); } public void configure(Context context) { } } }将项目打成jar包后复制到Flume安装目录的lib目录中,并修改agent2.conf
shell# 配置拦截器 a2.sources.r2.interceptors = i1 a2.sources.r2.interceptors.i1.type = com.itheima.flume.interceptor.MyInterceptor$Builder
六、Channel选择器
6.1 Replicating Channel Selector
复制选择器,如果没有指定,这个为默认选择器
可选属性如下
| 属性名 | 默认值 | 说明 |
|---|---|---|
| selector.type | replicating | replicating |
| selector.optional | – | optional |
使用案例:
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3上面的配置中,c3是一个可选的channel,写入c3失败的话会被忽略,c1和c2没有标记为可选,如果写入c1和c2失败会导致事务的失败。
6.2 Multiplexing Channel Selector
多路channel选择器,可选属性如下
| 属性名 | 默认值 | 说明 |
|---|---|---|
| selector.type | replicating | multiplexing |
| selector.header | flume.selector.header | 键值Key |
| selector.default | – | |
| selector.mapping.* | – | 路由 |
使用案例:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1 c2
a1.sources.r1.selector.mapping.US = c1 c3
a1.sources.r1.selector.default = c1 c4这里通过事件的header值来判断将事件发送到哪个channel,可以配合拦截器一起使用。
# 创建multichannel目录
mkdir multichannel
cd multichannel创建agent1.conf
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1 c2 c3 c4
# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink1类型为Logger
a1.sinks.k1.type = logger
# 配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.85.132
a1.sinks.k2.port = 4040
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.85.132
a1.sinks.k3.port = 4041
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.85.132
a1.sinks.k4.port = 4042
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100
a1.channels.c4.type = memory
a1.channels.c4.capacity = 1000
a1.channels.c4.transactionCapacity = 100
# 将source和sink绑定到channel上
a1.sources.r1.channels = c1 c2 c3 c4
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
a1.sinks.k4.channel = c4添加agent2.conf
# 定义agent名称为a2
# 设置3个组件的名称
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# 配置source类型为avro
a2.sources.r1.type = avro
a2.sources.r1.bind = 192.168.85.132
a2.sources.r1.port = 4040
# 配置sink类型为logger
a2.sinks.k1.type = logger
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1添加agent3.conf
# 定义agent名称为a3
# 设置3个组件的名称
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# 配置source类型为avro
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.85.132
a3.sources.r1.port = 4041
# 配置sink类型为logger
a3.sinks.k1.type = logger
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1添加agent4.conf
# 定义agent名称为a4
# 设置3个组件的名称
a4.sources = r1
a4.sinks = k1
a4.channels = c1
# 配置source类型为avro
a4.sources.r1.type = avro
a4.sources.r1.bind = 192.168.85.132
a4.sources.r1.port = 4042
# 配置sink类型为logger
a4.sinks.k1.type = logger
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1启动agent2、agent3、agent4和agent1
flume-ng agent -n a2 -c conf -f agent2.conf
flume-ng agent -n a3 -c conf -f agent3.conf
flume-ng agent -n a4 -c conf -f agent4.conf
flume-ng agent -n a1 -c conf -f agent1.conf使用Netcat往agent1发送消息,可以在agent2\3\4上看到消息
修改agent1.conf,配置通道选择器
# 配置拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = CZ
# 配置通道选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1 c2
a1.sources.r1.selector.mapping.US = c1 c3
a1.sources.r1.selector.default = c1 c4运行所有agent,发送数据到agent1,可以看到只有agent2收到了数据,修改拦截器的值为US,结果是agent3收到数据。
七、Sink处理器
可以将多个sink放入到一个组中,Sink处理器能够对一个组中所有的sink进行负载均衡,在一个sink出现临时错误时进行故障转移。
必须设置属性:
| 属性名 | 默认值 | 说明 |
|---|---|---|
| sinks | – | 组中多个sink使用空格分隔 |
| processor.type | default | default, failover 或load_balance |
举例:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover7.1 Default Sink Processor
默认的Sink处理器只支持单个Sink
7.2 Failover Sink Processor
故障转移处理器维护了一个带有优先级的sink列表,故障转移机制将失败的sink放入到一个冷却池中,如果sink成功发送了事件,将其放入到活跃池中,sink可以设置优先级,数字越高,优先级越高,如果一个sink发送事件失败,下一个有更高优先级的sink将被用来发送事件,比如,优先级100的比优先级80的先被使用,如果没有设置优先级,按配置文件中配置的顺序决定。设置属性如下:
| 属性名 | 默认值 | 说明 |
|---|---|---|
| sinks | – | 组内多个sinks空格分隔 |
| processor.type | default | failover |
| processor.priority. | – | 优先级 |
| processor.maxpenalty | 30000 | 失败sink的最大冷却时间 |
示例如下:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000修改以上agent1.conf
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1
# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.k3 = 15
a1.sinkgroups.g1.processor.priority.k4 = 20
a1.sinkgroups.g1.processor.maxpenalty = 10000
# 配置sink1类型为Logger
a1.sinks.k1.type = logger
# 配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.85.132
a1.sinks.k2.port = 4040
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.85.132
a1.sinks.k3.port = 4041
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.85.132
a1.sinks.k4.port = 4042
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1启动agent2\3\4和agent1,通过Netcat发送消息到agent1,可以看到消息一直发送给其中一个agent(agent3优先级最高),将这个agent关闭后,消息会发送到其他agent。
7.3 Load balancing Sink Processor
负载均衡处理器,可以通过轮询或者随机的方式进行负载均衡,也可以通过继承AbstractSinkSelector 自定义负载均衡,设置属性如下:
| 属性名 | 默认值 | 说明 |
|---|---|---|
| processor.sinks | – | 组内多个sinks空格分隔 |
| processor.type | default | load_balance |
| processor.backoff | false | 是否将失败的sink加入黑名单 |
| processor.selector | round_robin | 轮询机制:round_robin, random 或者自定义 |
| processor.selector.maxTimeOut | 30000 | 黑名单有效时间(单位毫秒) |
示例如下:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin修改上面的agent1.conf,将所有的sink都绑定到c1上
# 定义agent名称为a1
# 设置3个组件的名称
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1
# 配置source类型为NetCat,监听地址为本机,端口为44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
# 配置sink1类型为Logger
a1.sinks.k1.type = logger
# 配置sink2,3,4类型为Avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.85.132
a1.sinks.k2.port = 4040
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = 192.168.85.132
a1.sinks.k3.port = 4041
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = 192.168.85.132
a1.sinks.k4.port = 4042
# 配置channel类型为内存,内存队列最大容量为1000,一个事务中从source接收的Events数量或者发送给sink的Events数量最大为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 将source和sink绑定到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1八、Ganglia监控
Ganglia是UC Berkeley发起的一个开源集群监视项目,设计用于测量数以千计的节点。Ganglia的核心包含gmond(监控守护进程)、gmetad(元数据守护进程)以及一个Web前端。主要是用来监控系统性能,如:cpu 、mem、硬盘利用率, I/O负载、网络流量情况等,通过曲线很容易见到每个节点的工作状态,对合理调整、分配系统资源,提高系统整体性能起到重要作用。
8.1 Ganglia安装
中心节点的安装
- epel包的安装:yum install -y epel-release(解决不能yum安装某些安装包的问题)
- gmetad的安装:yum install -y ganglia-gmetad
- gmond的安装:yum install -y ganglia-gmond
- rrdtool的安装:yum install -y rrdtool
- httpd服务器的安装:yum install -y httpd
- ganglia-web及php安装:yum install -y ganglia-web php
被监测节点的安装
- epel包的安装:yum install -y epel-release(解决不能yum安装某些安装包的问题)
- gmond的安装:yum install -y gmond(提示找不到,感觉应该换成上面那个yum install -y ganglia-gmond)
8.2 Ganglia配置
中心节点的配置
安装目录说明
- ganglia配置文件目录:/etc/ganglia
- rrd数据库存放目录:/var/lib/ganglia/rrds
- ganglia-web安装目录:/usr/share/ganglia
- ganglia-web配置目录:/etc/httpd/conf.d/ganglia.conf
相关配置文件修改 将ganglia-web的站点目录连接到httpd主站点目录
$ ln -s /usr/share/ganglia /var/www/html修改httpd主站点目录下ganglia站点目录的访问权限 将ganglia站点目录访问权限改为apache:apache,否则会报错
$ chown -R apache:apache /var/www/html/ganglia
$ chmod -R 755 /var/www/html/ganglia修改rrd数据库存放目录访问权限 将rrd数据库存放目录访问权限改为nobody:nobody,否则会报错
$ chown -R nobody:nobody /var/lib/ganglia/rrds修改ganglia-web的访问权限: 修改/etc/httpd/conf.d/ganglia.conf
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
Require all granted
#Require ip 10.1.2.3
#Require host example.org
</Location>修改dwoo下面的权限
chmod 777 /var/lib/ganglia/dwoo/compiled
chmod 777 /var/lib/ganglia/dwoo/cache配置/etc/ganglia/gmetad.conf
data_source "my cluster" 192.168.85.132:8649(注意是所有节点都加上,如master:8649 slave0x:8649)
setuid_username nobody配置/etc/ganglia/gmond.conf
cluster {
name = "node01"
...
}
udp_send_channel {
# the host who gather this cluster's monitoring data and send these data to gmetad node
#注释掉多播模式的,以下出现这个都要注释掉
#mcast_join = 239.2.11.71
#添加单播模式的
host = 192.168.85.132
port = 8649
}
udp_recv_channel {
bind = 192.168.85.132
port = 8649
}
tcp_accept_channel {
port = 8649
}被监测节点的配置
配置/etc/ganglia/gmond.conf
cluster {
name = "hadoop cluster"
...
}
udp_send_channel {
# the host who gather this cluster's monitoring data and send these data to gmetad node
host = 192.168.26.139
port = 8649
}
udp_recv_channel {
port = 8649
}
tcp_accept_channel {
port = 8649
}8.3 Ganglia启动
中心节点的启动
start httpd, gmetad, gmond
$ systemctl start httpd.service
$ systemctl start gmetad.service
$ systemctl start gmond.service
$ systemctl enable httpd.service
$ systemctl enable gmetad.service
$ systemctl enable gmond.service被监测节点的启动
start gmond
$ systemctl start gmond.service
$ systemctl enable gmond.service关闭selinux
vi /etc/selinux/config,把SELINUX=enforcing改成SELINUX=disable;该方法需要重启机器。 可以使用命令setenforce 0来关闭selinux而不需要重启,刷新页面,即可访问;不过此法只是权宜之计,如果想永久修改selinux设置,还是要使用第一种方法
开启防火墙
firewall-cmd --zone=public --add-port=80/tcp --permanent
firewall-cmd --reload访问网页
浏览器访问 {namenode的ip}/ganglia即可
8.4 添加Flume监控
修改flume-env.sh配置,添加虚拟机选项
JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.85.132:8649 -Xms100m -Xmx200m"8.5 启动flume
flume-ng agent -n a1 -c conf -f example.conf -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.85.132:8649查看ganglia页面
| 字段(图表名称) | 字段含义 |
|---|---|
| EventPutAttemptCount | source尝试写入channel的事件总数量 |
| EventPutSuccessCount | 成功写入channel且提交的事件总数量 |
| EventTakeAttemptCount | sink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据。 |
| EventTakeSuccessCount | sink成功读取的事件的总数量 |
| StartTime | channel启动的时间(毫秒) |
| StopTime | channel停止的时间(毫秒) |
| ChannelSize | 目前channel中事件的总数量 |
| ChannelFillPercentage | channel占用百分比 |
| ChannelCapacity | channel的容量 |
九、安装
9.1 常规安装
Flume下载页面:http://flume.apache.org/download.html
# 解压命令
tar xzf apache-flume-1.9.0-bin.tar.gz
# 进入到apache-flume-1.9.0-bin 目录
cd apache-flume-1.9.0-bin
# Flume使用需要依赖JDK1.8以上环境,确保已安装
# 将Flume安装目录配置到PATH中,方便在任意目录使用
vi /etc/profile
# 添加以下内容
export JAVA_HOME=/opt/soft/jdk1.8.0_231
export JRE_HOME=/opt/soft/jdk1.8.0_231/jre
export FLUME_HOME=/opt/soft/apache-flume-1.9.0-bin
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$FLUME_HOME/bin:
# 保存成功后刷新
source /etc/profile
# 查看是否设置成功
echo $FLUME_HOME